Treasure Data社のOSSワークフローエンジン『Digdag』を試してみた #digdag
Digdag が Apache License 2.0 の元でオープンソース化されましたよ! さぁ試すんだ…! 今すぐにでも! https://t.co/Uzc4a5GLCe ドキュメント:https://t.co/PF8wy5KHln
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
という訳で試してみました。注目度の高かったワークフローエンジン『Digdag』がついにOSS化されました!Githubリポジトリ及びドキュメントは以下となります。
目次
インストール
環境の準備
今回は手っ取り早く触ってみよう!という事でEC2環境(Amazon Linux)を用意しました。
$ ssh -i xxxxxxx.pem ec2-user@xx.xxx.xxx.xx Last login: Wed Jun 15 15:03:28 2016 from xxx.xxx.xxx.xxx __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2016.03-release-notes/ $ sudo yum -y update
Digdagのインストール実施
導入はとても簡単。ソースコードを入手し、実行権限を付与。
$ sudo curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0 100 18.6M 100 18.6M 0 0 1201k 0 0:00:15 0:00:15 --:--:-- 690k $ $ sudo chmod +x /usr/local/bin/digdag $
digdag --helpを実行してみます。おや?エラーが出ますね...
$ digdag --help Exception in thread "main" java.lang.UnsupportedClassVersionError: io/digdag/cli/Main : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:803) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482) $
このエラーに関する記述もドキュメントに記載されていました。Java8(8u72)以上の環境が必要となるようです。
$ java -version java version "1.7.0_101" OpenJDK Runtime Environment (amzn-2.6.6.1.67.amzn1-x86_64 u101-b00) OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
既存Java環境をアンインストールし、
$ sudo yum -y remove java $ java -version -bash: /usr/bin/java: そのようなファイルやディレクトリはありません
RPMパッケージを入手、新しいバージョンのJavaをインストール。
$ sudo rpm -ivh jdk-8u92-linux-x64.rpm 準備しています... ################################# [100%] 更新中 / インストール中... 1:jdk1.8.0_92-2000:1.8.0_92-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar... $ java -version java version "1.8.0_92" Java(TM) SE Runtime Environment (build 1.8.0_92-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode) $
再度digdag --helpコマンドを実施。今度はちゃんとヘルプが表示されました!これでインストール完了です。
$ digdag --help 2016-06-15 15:35:29 +0000: Digdag v0.8.1 Usage: digdag <command> [options...] Local-mode commands: new <path> create a new workflow project r[un] <workflow.dig> run a workflow c[heck] show workflow definitions sched[uler] run a scheduler server selfupdate update digdag to the latest version Server-mode commands: server start digdag server Client-mode commands: push <project-name> create and upload a new revision start <project-name> <name> start a new session attempt of a workflow retry <attempt-id> retry a session kill <attempt-id> kill a running session attempt backfill start sessions of a schedule for past times reschedule skip sessions of a schedule to a future time log <attempt-id> show logs of a session attempt workflows [project-name] [name] show registered workflow definitions schedules show registered schedules sessions show sessions for all workflows sessions <project-name> show sessions for all workflows in a project sessions <project-name> <name> show sessions for a workflow session <session-id> show a single session attempts show attempts for all sessions attempts <session-id> show attempts for a session attempt <attempt-id> show a single attempt tasks <attempt-id> show tasks of a session attempt version show client and server version Options: -L, --log PATH output log messages to a file (default: -) -l, --log-level LEVEL log level (error, warn, info, debug or trace) -X KEY=VALUE add a performance system config -c, --config PATH.properties Configuration file (default: /home/ec2-user/.config/digdag/config) Use `<command> --help` to see detailed usage of a command. $
サンプルワークフローの実行
サンプルワークフローの実行についてもドキュメントでカバーされていますのでこちらも早速試してみます。
digdag initでワークフローの初期化/作成、
$ digdag init cmdag 2016-06-15 15:38:34 +0000: Digdag v0.8.1 Creating cmdag/.gitignore Creating cmdag/tasks/shell_sample.sh Creating cmdag/tasks/repeat_hello.sh Creating cmdag/tasks/__init__.py Creating cmdag/cmdag.dig Done. Type `cd cmdag` and then `digdag run cmdag.dig` to run the workflow. Enjoy!
作成されたフォルダに移動し、digdag runコマンドで拡張子*.digを指定して実行。幾つかログが表示された後、処理が正常終了しました。
$ cd cmdag/ $ digdag run cmdag.dig 2016-06-15 15:40:44 +0000: Digdag v0.8.1 2016-06-15 15:40:46 +0000 [WARN] (main): Using a new session time 2016-06-15T00:00:00+00:00. 2016-06-15 15:40:46 +0000 [INFO] (main): Using session .digdag/status/20160615T000000+0000. 2016-06-15 15:40:46 +0000 [INFO] (main): Starting a new session project id=1 workflow name=cmdag session_time=2016-06-15T00:00:00+00:00 2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step1): sh>: tasks/shell_sample.sh Step1 of session 2016-06-15T00:00:00+00:00 2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step2+worker1): sh>: tasks/repeat_hello.sh Hello, world! from process 3334 2016-06-15 15:40:47 +0000 [INFO] (0019@+cmdag+step2+worker2): sh>: tasks/repeat_hello.sh Hello, world! from process 3336 Hello, world! from process 3334 Hello, world! from process 3336 Hello, world! from process 3334 Hello, world! from process 3336 Hello, world! from process 3334 Hello, world! from process 3336 2016-06-15 15:40:51 +0000 [INFO] (0019@+cmdag+step3): py>: tasks.MyWorkflow.step3 Step3 of session 2016-06-15T00:00:00+00:00 Success. Task state is saved at .digdag/status/20160615T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks. $
実行結果も気になりますが、まずは実行時に指定したファイルを見てみましょう。非常にシンプルな設定内容で何やらジョブ等が設定されています。step2の部分は_parallel: trueと指定があるので並行処理をさせているというのも把握出来ますね。
$ cat cmdag.dig timezone: UTC _export: hello: "Hello, world!" +step1: sh>: tasks/shell_sample.sh +step2: _parallel: true +worker1: sh>: tasks/repeat_hello.sh +worker2: sh>: tasks/repeat_hello.sh +step3: # defined at tasks/__init__.py py>: tasks.MyWorkflow.step3 $
設定ファイルで指定されていた実行ファイルの内容も確認して見ます。*.digファイルで定義した変数(hello)がこちらのファイルで受け渡されて出力されている様ですね。
$ cd tasks/ $ ll 合計 16 -rw-rw-r-- 1 ec2-user ec2-user 159 6月 15 15:38 __init__.py -rw-rw-r-- 1 ec2-user ec2-user 578 6月 15 15:40 __init__.pyc -rwxrw-r-- 1 ec2-user ec2-user 167 6月 15 15:38 repeat_hello.sh -rwxrw-r-- 1 ec2-user ec2-user 49 6月 15 15:38 shell_sample.sh $ cat shell_sample.sh #!/bin/sh echo "Step1 of session $session_time" $ cat repeat_hello.sh #!/bin/sh echo "$hello from process $$" sleep 1 echo "$hello from process $$" sleep 1 echo "$hello from process $$" sleep 1 echo "$hello from process $$" sleep 1 $
タスクのステータスに関する情報が格納されているディレクトリに移動して中身を見てみます。非常にシンプルなタスクだったのか、詳細な内容までは出力されていませんが、処理それぞれの内容(*.yml)にstate: "success"という情報が出力されているのが確認出来ます。
$ pwd /home/ec2-user/cmdag/.digdag/status/20160615T000000+0000 $ ll 合計 24 -rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step1.yml -rw-rw-r-- 1 ec2-user ec2-user 156 6月 15 15:40 +cmdag+step2+worker1.yml -rw-rw-r-- 1 ec2-user ec2-user 156 6月 15 15:40 +cmdag+step2+worker2.yml -rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step2.yml -rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step3.yml -rw-rw-r-- 1 ec2-user ec2-user 142 6月 15 15:40 +cmdag.yml $ cat +cmdag.yml fullName: "+cmdag" state: "success" result: subtaskConfig: {} exportParams: {} storeParams: {} report: inputs: [] outputs: [] $ cat +cmdag+step1.yml fullName: "+cmdag+step1" state: "success" result: subtaskConfig: {} exportParams: {} storeParams: {} report: inputs: [] outputs: [] $ cat +cmdag+step2+worker1.yml fullName: "+cmdag+step2+worker1" state: "success" result: subtaskConfig: {} exportParams: {} storeParams: {} report: inputs: [] outputs: [] $
その他ドキュメントの内容について
『Getting Started』に関する部分は上記の内容となります。公式ドキュメントにはその他以下の様なセクションで内容が構成されています。読み応えがありますがとても分かり易く解説されている印象です。
- Architecture
- Workflow definition
- Scheduling workflow
- Operators
- Command reference
- Language API - Python
- Language API - Ruby
- Release Notes
Digdagの生みの親:古橋さんの関連ツイートが特徴を端的に紹介されているので、併せて引用させて頂きます。
Digdag、まずはEmbulkによるETL処理の自動化に最適。複数のデータソースから並列または直列にデータロード→日付ごとにテーブル作成→一次集計とJOIN…という処理を直感的に記述できる。 #digdag pic.twitter.com/IDEV2eVbjZ
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
パラメータ化が可能。引数やファイルから受け取ったパラメータを、設定ファイルや引数に埋め込んでからコマンドを実行できる。同じような処理を同じテーブルや同じデータソースに対して適用したい場合に効果的。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
for_each> や if> などのフロー制御ができ、プログラマブルなワークフローを組み立てられる。外部のスクリプトを呼んでパラメータをとってきて、その値に応じて次のアクションを変えたり、ループさせたりできる。https://t.co/Y7v21U3Sek #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
Dockerイメージを指定してタスクを実行できる、スケジューラ内蔵なのでcronがいらない、分散実行に対応、実行時間超過/失敗時のエラー通知、etc etc。今ワークフローエンジンをちゃんと作るとこうなるよね、という機能がほぼあるはず。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
正直なところ、コードはEmbulkより良くできているので、メンテナンスが楽そう…チームメンバーも2人いるし。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
参考資料
更にDigdagを理解する上での助けとなる情報は以下の通り。過去イベントで発表されたスライド資料及び動画、また、古橋さんの事前インタビュー記事も必見です。
スライド資料: 分散ワークフローエンジン『DigDag』の実装 at Tokyo RubyKaigi #11
YouTube動画: 【基調講演】分散ワークフローエンジン『Digdag』の実装
関連インタビュー記事
まとめ
以上、新しくリリースされたTreasure Data社によるOSSプロジェクト『Digdag』に関する『やってみた』エントリでした。上記で試した内容はまだほんの触りの部分ですので、これから折を見てドキュメントを読みながらDigdagによるワークフロー構築を実践して行きたいと思います!こちらからは以上です。